package com.atguigu.flink.sql.definetime;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

/**
 * Created by Smexy on 2023/11/20
 */
public class Demo2_SQLDefineTime
{
    public static void main(String[] args) {

        //读取外部系统直接获取一个Table，无需流的环境。直接定义表的环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        //定义建表语句
        String createTableSql = "CREATE TABLE t1 (" +
            "  id STRING," +
            "  ts BIGINT," +
            "  vc INT ," +
            "  pt AS PROCTIME() ," +
            "  et AS TO_TIMESTAMP_LTZ(ts,3) ," +
            " WATERMARK FOR et AS et - INTERVAL '0.001' SECOND " +
            ")  WITH (" +
            "  'connector' = 'filesystem',   " +
            "  'path' = 'data/ws.json', " +
            "  'format' = 'json'             " +
            ")";

        //insert，create语句，需要使用executeSql执行
        tableEnv.executeSql(createTableSql);
        tableEnv.from("t1").printSchema();

        //tableEnv.sqlQuery("desc table t1 ").execute().print();
        //执行查询
        tableEnv.sqlQuery("select * from t1")
                .execute()
                .print();

    }
}
